Flink: Refactor to use the BaseEqualityDeltaWriter.#4264
Flink: Refactor to use the BaseEqualityDeltaWriter.#4264openinx wants to merge 18 commits intoapache:mainfrom
Conversation
| private final int[] equalityFieldIds; | ||
| private final SortOrder sortOrder; | ||
| private DeleteFile deleteFile = null; | ||
| private long rowOffset = 0; |
There was a problem hiding this comment.
How about name recordCount or rowCount?
IMO rowOffset is specific for tracking record write to file, recordCount is more general.
There was a problem hiding this comment.
I see the BaseFile is using the recordCount, I think it's better to keep this consistent with the BaseFile's recordCount :-)
|
FYI @rdblue , @aokolnychyi , @dungdm93 . I think this draft PR is ready to review now. Mind to take a look when you have a chance ? |
| * @param partition a partition or null if the spec is unpartitioned | ||
| */ | ||
| void write(T row, PartitionSpec spec, StructLike partition); | ||
| PathOffset write(T row, PartitionSpec spec, StructLike partition); |
There was a problem hiding this comment.
As the PartitioningWriter will have multiple partition writers in its internal implementation, so we can not just introduce the two methods in this interface:
/**
* Returns the file path that are currently opened.
*
* @return the current file path.
*/
CharSequence location();
/**
* Returns the row offset that are currently writing, starting from 0.
*
* @return the current row offset.
*/
long rowOffset();Because those location() and rowOffset() are binded to specific writers. In theory, different writers should have different location() and rowOffset(). So here we add a return value PathOffset here to get the latest wrote path & offset for the wrote row.
| import org.apache.iceberg.util.StructLikeMap; | ||
| import org.apache.iceberg.util.StructProjection; | ||
|
|
||
| public class BaseEqualityDeltaWriter<T> implements EqualityDeltaWriter<T> { |
There was a problem hiding this comment.
Are there any style guidelines for documentation? Typically its nice to have javadoc on al public classes/members that aren't overwritten.
|
|
||
| public class BaseEqualityDeltaWriter<T> implements EqualityDeltaWriter<T> { | ||
|
|
||
| private final ThreadLocal<PositionDelete<T>> posDelete = ThreadLocal.withInitial(PositionDelete::create); |
There was a problem hiding this comment.
maybe document why thread local is used here?
| void write(T row); | ||
|
|
||
| /** | ||
| * Returns the file path that are currently opened. |
There was a problem hiding this comment.
| * Returns the file path that are currently opened. | |
| * Returns the file path that is currently opened. |
Is "opened" important here? what happens if the writer is closed?
| CharSequence location(); | ||
|
|
||
| /** | ||
| * Returns the row offset that are currently writing, starting from 0. |
There was a problem hiding this comment.
| * Returns the row offset that are currently writing, starting from 0. | |
| * Returns the row offset that will be written to next, starting from 0. |
Is this interface intended to be seekable at any point? If not maybe call this rowsWritten which might be clearer?
There was a problem hiding this comment.
As we've discussed in https://github.com/apache/iceberg/pull/4264/files#r819238913, I agree recordCount is a good & consistent name.
There was a problem hiding this comment.
As we've discussed in https://github.com/apache/iceberg/pull/4264/files#r819238913, I agree recordCount is a good & consistent name.
| if (!retireOldKey(asStructLike.apply(key), spec, partition)) { | ||
| equalityWriter.write(key, spec, partition); | ||
| } |
There was a problem hiding this comment.
I think this will be needed to reconsider because of this PR: #4364
588e7b3 to
6f329e1
Compare
3bc727c to
be391f7
Compare
|
CC @rdblue @aokolnychyi , Would you mind to take a look when you have a chance ? |
be391f7 to
86f83fa
Compare
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
This PR is trying to refactor the flink write path to remove those underlying old writers to the partition specified writers ( Introduced from @aokolnychyi ).
I was reviewing the PR #4132 from @dungdm93, which is the similar thing that this one is trying to accomplish. I tried to rewrite it because I wanted to try out the details of the design for myself and get a better understanding of those new writers. In this way, I think I can provide more background & details design from my perspective. After high level design are okay, then I think we can make the whole PR to be smaller ones and let's collaborate together to get this work done @dungdm93 .